package com.cantor.provider.invoker.impl;

import com.cantor.common.exception.ServiceInvokeException;
import com.cantor.common.util.CantorUtil;
import com.cantor.core.message.CantorResponseMessage;
import com.cantor.core.pool.CantorExecutorPool;
import com.cantor.provider.invoker.ProviderServiceInvoker;
import com.cantor.provider.pojo.ServiceBoat;
import com.cantor.provider.spi.ExecutableAfterServiceRun;
import com.cantor.provider.spi.ExecutableBeforeServiceRun;
import com.cantor.provider.spi.ExecutableWhileServiceRunning;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Method;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * Provider端的服务执行器, 在netty的handler中被new, 并异步调用这个的invoke方法
 */
@Slf4j
public class GenericProviderServiceInvoker implements ProviderServiceInvoker {

    // 要回写的ChannelContext
    private ChannelHandlerContext ctx;

    // 需要ServiceBoat
    private ServiceBoat serviceBoat;

    // 具体的服务的impl
    private Object serviceImpl;

    // 要调用的方法
    private Method method;

    // 方法参数值数组
    private Object[] paramValues;

    // 本次回复的sequenceId
    private long sequenceId;

    // 构造方法
    public GenericProviderServiceInvoker(ChannelHandlerContext ctx, ServiceBoat serviceBoat, Object serviceImpl, Method method, Object[] paramValues, long sequenceId) {
        this.ctx = ctx;
        this.serviceBoat = serviceBoat;
        this.serviceImpl = serviceImpl;
        this.method = method;
        this.paramValues = paramValues;
        this.sequenceId = sequenceId;
    }

    @Override
    public void invoke() {
        // 1. 执行服务前置SPI (用户可能会修改paramValues)
        log.debug("执行服务前置SPI..");
        ServiceLoader.load(ExecutableBeforeServiceRun.class).forEach(action -> action.execute(paramValues));
        // 2. 执行与服务同时运行的SPI
        CompletableFuture.runAsync(() -> {
            log.debug("执行服务同时SPI..");
            ServiceLoader.load(ExecutableWhileServiceRunning.class).forEach(ExecutableWhileServiceRunning::execute);
        }, CantorExecutorPool.pool());
        // 3. 执行服务业务
        CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
            @Override
            @SneakyThrows
            public Object get() {
                return method.invoke(serviceImpl, paramValues);
            }
        }, CantorExecutorPool.pool())
                // 超时检测
                .applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
                    CantorUtil.threadSleep(serviceBoat.getTimeout());
                    return new TimeoutException("远程服务执行超时");
                }), new Function<Object, Object>() {
                    @Override
                    @SneakyThrows
                    public Object apply(Object rOrE) {
                        if (rOrE instanceof TimeoutException) {
                            throw (Throwable) rOrE;
                        }
                        return rOrE;
                    }
                }, CantorExecutorPool.pool())
                .exceptionally(e -> {
                    e.printStackTrace();
                    log.error("服务{}.{}(...)执行失败,可能是超时,或者服务本身执行异常", serviceImpl.getClass(), method.getName());
                    return new ServiceInvokeException(e.getMessage());
                });
        // 最终结果(可能是异常, 也可能是正确返回值)
        Object finalResult = future.join();
        // 准备Response对象
        CantorResponseMessage.CantorResponseMessageBuilder builder = CantorResponseMessage.builder();
        // 根据返回值信息构建Response对象
        // 如果是已经被捕捉到的异常,本地调用算失败
        if (finalResult instanceof ServiceInvokeException) {
            builder.exceptionValue((ServiceInvokeException) finalResult);
        } else if (finalResult instanceof Exception && !(finalResult instanceof ServiceInvokeException)) {
            // 如果是异常, 但是是意料之外的, 按致命错误处理
            throw new Error("未知错误");
        } else {
            // 4. 执行服务后置SPI
            log.debug("执行服务后置SPI..");
            AtomicReference<Object> tempResult = new AtomicReference<>(finalResult);
            ServiceLoader.load(ExecutableAfterServiceRun.class).forEach(executable -> tempResult.set(executable.execute(tempResult.get())));
            builder.returnValue(tempResult.get());
        }
        // build and write
        CantorResponseMessage res = builder.build();
        res.setSequenceId(sequenceId);
        ctx.channel().writeAndFlush(res).addListener((ChannelFutureListener) channelFuture -> log.debug("{} >>> Response >>> {}",ctx.channel().localAddress(),ctx.channel().remoteAddress()));
    }
}
